{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "1041ae6f",
   "metadata": {},
   "source": [
    "![iceberg-logo](https://www.apache.org/logos/res/iceberg/iceberg.png)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "247fb2ab",
   "metadata": {},
   "source": [
    "### [Docker, Spark, and Iceberg: The Fastest Way to Try Iceberg!](https://tabular.io/blog/docker-spark-and-iceberg/)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "6a5c8206",
   "metadata": {},
   "outputs": [],
   "source": [
    "spark"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "6f9a9f41",
   "metadata": {},
   "source": [
    "## Load One Month of NYC Taxi/Limousine Trip Data\n",
    "\n",
    "For this notebook, we will use the New York City Taxi and Limousine Commision Trip Record Data that's available on the AWS Open Data Registry. This contains data of trips taken by taxis and for-hire vehicles in New York City. We'll save this into an iceberg table called `taxis`."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "747bee98",
   "metadata": {},
   "source": [
    "To be able to rerun the notebook several times, let's drop the table if it exists to start fresh."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "930682ce",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "DROP TABLE IF EXISTS nyc.taxis"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1c37ca92",
   "metadata": {},
   "outputs": [],
   "source": [
    "df = spark.read.parquet(\"/home/iceberg/data/yellow_tripdata_2021-04.parquet\")\n",
    "df.write.saveAsTable(\"nyc.taxis\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "9fddb808",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "DESCRIBE EXTENDED nyc.taxis"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "bcf99fb3",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "SELECT COUNT(*) as cnt\n",
    "FROM nyc.taxis"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "cffd2c03",
   "metadata": {},
   "source": [
    "## Schema Evolution\n",
    "\n",
    "Adding, dropping, renaming, or altering columns is easy and safe in Iceberg. In this example, we'll rename `fare_amount` to `fare` and `trip_distance` to `distance`. We'll also add a float column `fare_per_distance_unit` immediately after `distance`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "efee8252",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "ALTER TABLE nyc.taxis RENAME COLUMN fare_amount TO fare"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "794de3a0",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "ALTER TABLE nyc.taxis RENAME COLUMN trip_distance TO distance"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "adac7564",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "ALTER TABLE nyc.taxis ALTER COLUMN distance COMMENT 'The elapsed trip distance in miles reported by the taximeter.'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "32d7e6ef",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "ALTER TABLE nyc.taxis ALTER COLUMN distance TYPE double;"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "6fb4b02a",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "ALTER TABLE nyc.taxis ALTER COLUMN distance AFTER fare;"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "81f7cc19",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "ALTER TABLE nyc.taxis\n",
    "ADD COLUMN fare_per_distance_unit float AFTER distance"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "9416b498",
   "metadata": {},
   "source": [
    "Let's update the new `fare_per_distance_unit` to equal `fare` divided by `distance`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "18771ccb",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "UPDATE nyc.taxis\n",
    "SET fare_per_distance_unit = fare/distance"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "09c72ca5",
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "SELECT\n",
    "VendorID\n",
    ",tpep_pickup_datetime\n",
    ",tpep_dropoff_datetime\n",
    ",fare\n",
    ",distance\n",
    ",fare_per_distance_unit\n",
    "FROM nyc.taxis"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "37582e02",
   "metadata": {},
   "source": [
    "## Expressive SQL for Row Level Changes\n",
    "With Iceberg tables, `DELETE` queries can be used to perform row-level deletes. This is as simple as providing the table name and a `WHERE` predicate. If the filter matches an entire partition of the table, Iceberg will intelligently perform a metadata-only operation where it simply deletes the metadata for that partition.\n",
    "\n",
    "Let's perform a row-level delete for all rows that have a `fare_per_distance_unit` greater than 4 or a `distance` greater than 2. This should leave us with relatively short trips that have a relatively high fare per distance traveled."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ded820f1",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "DELETE FROM nyc.taxis\n",
    "WHERE fare_per_distance_unit > 4.0 OR distance > 2.0"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "faef3712",
   "metadata": {},
   "source": [
    "There are some fares that have a `null` for `fare_per_distance_unit` due to the distance being `0`. Let's remove those as well."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "18b69265",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "DELETE FROM nyc.taxis\n",
    "WHERE fare_per_distance_unit is null"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7b92d7db",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "SELECT\n",
    "VendorID\n",
    ",tpep_pickup_datetime\n",
    ",tpep_dropoff_datetime\n",
    ",fare\n",
    ",distance\n",
    ",fare_per_distance_unit\n",
    "FROM nyc.taxis"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "6d5472b7",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "SELECT COUNT(*) as cnt\n",
    "FROM nyc.taxis"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c4b157e5",
   "metadata": {},
   "source": [
    "## Partitioning\n",
    "\n",
    "A table’s partitioning can be updated in place and applied only to newly written data. Query plans are then split, using the old partition scheme for data written before the partition scheme was changed, and using the new partition scheme for data written after. People querying the table don’t even have to be aware of this split. Simple predicates in WHERE clauses are automatically converted to partition filters that prune out files with no matches. This is what’s referred to in Iceberg as *Hidden Partitioning*."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "30e3e3b7",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "ALTER TABLE nyc.taxis\n",
    "ADD PARTITION FIELD VendorID"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "6fce6bb4",
   "metadata": {},
   "source": [
    "## Metadata Tables\n",
    "\n",
    "Iceberg tables contain very rich metadata that can be easily queried. For example, you can retrieve the manifest list for any snapshot, simply by querying the table's `snapshots` table."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "8fade1a3",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "SELECT snapshot_id, manifest_list\n",
    "FROM nyc.taxis.snapshots"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "64887133",
   "metadata": {},
   "source": [
    "The `files` table contains loads of information on data files, including column level statistics such as null counts, lower bounds, and upper bounds."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1cb712f7",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "SELECT file_path, file_format, record_count, null_value_counts, lower_bounds, upper_bounds\n",
    "FROM nyc.taxis.files"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "65deb074",
   "metadata": {},
   "source": [
    "## Time Travel\n",
    "\n",
    "The history table lists all snapshots and which parent snapshot they derive from. The `is_current_ancestor` flag let's you know if a snapshot is part of the linear history of the current snapshot of the table."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "bab64f90",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "SELECT *\n",
    "FROM nyc.taxis.history"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "47129d69",
   "metadata": {},
   "source": [
    "You can time-travel by altering the `current-snapshot-id` property of the table to reference any snapshot in the table's history. Let's revert the table to it's original state by traveling to the very first snapshot ID."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4c360238",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql --var df\n",
    "\n",
    "SELECT *\n",
    "FROM nyc.taxis.history"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "8df43d00",
   "metadata": {},
   "outputs": [],
   "source": [
    "original_snapshot = df.head().snapshot_id\n",
    "print(original_snapshot)\n",
    "spark.sql(f\"CALL demo.system.rollback_to_snapshot('nyc.taxis', {original_snapshot})\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "955a4c52",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "SELECT\n",
    "VendorID\n",
    ",tpep_pickup_datetime\n",
    ",tpep_dropoff_datetime\n",
    ",fare\n",
    ",distance\n",
    ",fare_per_distance_unit\n",
    "FROM nyc.taxis"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "67b71c76",
   "metadata": {},
   "source": [
    "Another look at the history table shows that the original state of the table has been added as a new entry\n",
    "with the original snapshot ID."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "91b801d3",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "SELECT *\n",
    "FROM nyc.taxis.history"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "85667efc",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "SELECT COUNT(*) as cnt\n",
    "FROM nyc.taxis"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
